今天要來用 ES 的 Ingest Pipeline 處理前面用來做 Auto complete 的 Tweeter 資料,先回顧一下:
資料的樣子
{'name': 'MaggieBreathnac',
 'user_id': 487990281,
 'tweet': 'Making memories #nanny #anpost #isolation #happy #corona @ An Rinn '
          'https://t.co/byA9uSVxFc',
 'tweet_id': 1244969855058677766,
 'retweets': 0,
 'favorites': 0,
 'created': '31-Mar-2020',
 'followers': 522,
 'is_user_verified': False,
 'geo': {'type': 'Point', 'coordinates': [52.04681279, -7.56678938]},
 'coordinates': {'type': 'Point', 'coordinates': [-7.56678938, 52.04681279]},
 'location': 'dublin',
 'primary_location': {'type': 'Point',
                      'coordinates': [-7.56678938, 52.04681279]}}
希望他變成的樣子
{
		"name": 'MaggieBreathnac',
		"user_id": 487990281,
		"tweet": 'Making memories #nanny #anpost #isolation #happy #corona @ An Rinn '
          'https://t.co/byA9uSVxFc',
		"tweet_id": 1244969855058677766,
		"retweets": 0,
		"favorites": 0,
		"created": '31-Mar-2020',
		"followers": 522,
		"is_user_verified": False,
		"geo": [-7.56678938, 52.04681279],
		"location": 'dublin'
}
python 做了什麼處理
def data_to_es(json_file: str, index_name: str):
    with open(json_file, 'r') as f:
        data = json.load(f)
    fields_to_rm = ['primary_location', 'coordinates']
    for d in data:
        for f in fields_to_rm:
            if d.get(f):
                d.pop(f)
        if d.get('geo'):
            d['geo'] = d['geo']['coordinates'].reverse()
        if d.get('created'):
            d['created'] = datetime.strptime(d['created'], "%d-%b-%Y")
        d['_index'] = index_name
        yield d
primary_location 和 coordinates 這兩個值這些轉換都蠻單純的,符合使用 ES ingest pipeline 的情境,以下是三個步驟的 Processor:
移除 primary_location 和 coordinates 這兩個值
  {
    "remove": {
      "field": [
        "primary_location",
        "coordinates"
      ]
    }
  }
轉換 geo.coordinates 內的值並寫為 geo 欄位
  {
    "json": {
      "field": "geo.coordinates",
      "target_field": "geo"
    }
  }
b. 將 geo 欄位內的經緯度反過來
  {
    "script": {
      "source": "ArrayList tmp = ctx[\"geo\"]; Collections.reverse(tmp); //ctx[\"geo\"]=[ctx[\"geo\"][1], ctx[\"geo\"][0]] ctx[\"geo\"] = tmp; "
    }
  }
解析 created 欄位為日期欄位
  {
    "date": {
      "field": "created",
      "formats": [
        "dd-MMM-yyyy"
      ],
      "target_field": "created"
    }
  }
這樣我們就完成了一條資料管線了,其中比較麻煩的是要的經緯度 arrayList 反序的 processor,需要理解 painless pipeline,但比起要為了這一條管線開一個環境安裝 python、維運 python script 單純多了。
這系列文章也終於告一個段落了,下一篇會來個總回顧~